package com.dahaonetwork.smartfactory.websocket.util;

import java.io.IOException;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.junit.Test;

import com.alibaba.fastjson.JSON;
import com.dahaonetwork.smartfactory.websocket.base.PushMessageThread;
import com.dahaonetwork.smartfactory.websocket.base.WebSocket;
import com.dahaonetwork.smartfactory.websocket.model.Message;

import javax.servlet.ServletException;

@WebServlet(urlPatterns = "/WebSocketDispatcher")
public class WebSocketDispatcher extends HttpServlet{
	/** 序列号 */
	private static final long serialVersionUID = 1L;
	/** 线程池 */
	private static ExecutorService execPool = Executors.newCachedThreadPool();
	
	public void init() throws ServletException {
		super.init();
		//init ThreadPool
		//execPool = Executors.newCachedThreadPool();
	}

	protected void doGet(HttpServletRequest request, HttpServletResponse resp)
			throws ServletException, IOException {
		this.doPost(request, resp);
	}
	
	protected void doPost(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {
		
		//获取message json串
		String message = request.getParameter("message");
		//得到message对象
		//JSON json=JSON.parseObject(message);
		Message msg =JSON.parseObject(message, Message.class);
		WebSocketDispatcher.sendMessage(msg, response);
		//testThread();
	}
	
	/**
	 * @throws IOException 
	 * @Title: sendMessage
	 * @Description: 向指定客户端进行消息的分发
	 * @param key 如workflow/history
	 * @param message 消息体
	 * @return void
	 * @throws
	 */
	public static void sendMessage(Message msg, HttpServletResponse response) 
			throws IOException{
		HashMap<String, HashMap<String, CopyOnWriteArraySet<WebSocket>>> onlineSocket = 
				WebSocket.WebSocketCollection.webSocketMap;
		//get current topic websocket collection
		HashMap<String, CopyOnWriteArraySet<WebSocket>> identities = 
				onlineSocket.get(msg.getTopicId());
		//if not null
		if(identities != null){
			//Localtion the WebSocket collection
			Set<WebSocket> reciver = identities.get(msg.getIdentityId());
			if(reciver != null){
				//start thread to push message
				PushMessageThread newPush = new PushMessageThread(reciver,msg);
				//newPush.start();
				execPool.submit(newPush);
			}
		}
	}
	
	/**
	 * TODO
	 * @param 
	 * @return void
	 * @throws
	 */
	@Test
	public void testThread(){
		class TestThread implements Runnable{
			private int index;
			private WebSocketDispatcher dispatcher;
			public TestThread(int index, WebSocketDispatcher dispatcher){
				this.index = index;
				this.dispatcher = dispatcher;
			}
			public void run() {
				Message msg = new Message("echo", "123lgb", "lgb send "+index, "lgb","whole");
				try {
					WebSocketDispatcher.sendMessage(msg, null);
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			
		}
		int i = 1000;
		while(i-->0){
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(i);
			TestThread test = new TestThread(i, this);
			test.run();
		}
		
	}

}
